package redis

import (
	"encoding/json"
	"math/rand"
	"sync"
	"time"

	redigo "github.com/gomodule/redigo/redis"
	"github.com/samuel/go-zookeeper/zk"
)

func InitCodis(setting Setting) *Pool {
	checkSetting(&setting)

	codisInstance := CodisPool{
		ZkServers: setting.ZkHost,
		ZkDir:     setting.ZkPath,
		ZkTimeout: time.Second * 2,
	}
	codisInstance.initFromZk()

	pool := &redigo.Pool{
		MaxIdle:     setting.MaxIdle,     // 池中最大空闲连接数
		MaxActive:   setting.MaxPoolSize, // 池中最大连接数,不确定可以用0（0表示自动定义）。当Wait为true时，达到最大连接数时会等待；否则直接返回errConn
		IdleTimeout: setting.IdleTimeout, // 连接关闭时间300秒
		Wait:        true,
		// 使用DialContext带计时器的连接
		Dial: func() (redigo.Conn, error) {
			codisInstance.mu.RLock()
			defer codisInstance.mu.RUnlock()

			rand.Seed(time.Now().Unix())
			idx := rand.Intn(len(codisInstance.proxyInfo))
			proxyInfo := codisInstance.proxyInfo[idx]

			return redigo.Dial(proxyInfo.ProtoType, proxyInfo.ProxyAddr,
				redigo.DialConnectTimeout(setting.ConnectTimeout),
				redigo.DialWriteTimeout(setting.DoWithTimeout),
				redigo.DialReadTimeout(setting.DoWithTimeout),
			)
		}, // 创建连接时调用的函数
		TestOnBorrow: func(c redigo.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		}, // 复用连接时，会先用此函数检查此连接是否可用
	}

	// ch为池中可用剩余连接
	// active用于总共创建的连接数
	return &Pool{
		pool:    pool,
		Name:    setting.Name,
		timeout: setting.DoWithTimeout,
		Type:    "codis",
	}
}

type proxyInfo struct {
	ProtoType string `json:"proto_type"`
	ProxyAddr string `json:"proxy_addr"`
}

type CodisPool struct {
	zk        *zk.Conn
	ZkServers []string
	ZkTimeout time.Duration
	ZkDir     string

	proxyInfo []proxyInfo
	err       error
	mu        sync.RWMutex
}

func (c *CodisPool) initFromZk() {
	c.initZk() // 连接zookeeper

	c.mu.Lock()
	defer c.mu.Unlock()

	children, _, err := c.zk.Children(c.ZkDir)

	if err != nil {
		c.err = err
		return
	}

	// 从zk获取proxy
	for _, child := range children {
		data, _, err := c.zk.Get(c.ZkDir + "/" + child)

		if err != nil {
			continue
		}

		var p proxyInfo

		json.Unmarshal(data, &p)
		if p.ProxyAddr != "" && p.ProtoType != "" {
			c.proxyInfo = append(c.proxyInfo, p)
		}
	}

	go c.watch(c.ZkDir)
}

func (c *CodisPool) watch(node string) {
	for {
		_, _, ch, err := c.zk.ChildrenW(node)
		if err != nil {
			c.err = err
			return
		}
		evt := <-ch

		if evt.Type == zk.EventSession {
			if evt.State == zk.StateConnecting {
				continue
			}
			if evt.State == zk.StateExpired {
				c.zk.Close()
				// Zookeeper session expired, reconnecting...
				c.initZk()
			}
		}
		if evt.State == zk.StateConnected {
			switch evt.Type {
			case
				zk.EventNodeCreated,
				zk.EventNodeDeleted,
				zk.EventNodeChildrenChanged,
				zk.EventNodeDataChanged:
				c.initFromZk()
				return
			}
			continue
		}
	}
}

func (c *CodisPool) initZk() {
	zkConn, _, err := zk.Connect(c.ZkServers, c.ZkTimeout)
	if err != nil {
		c.err = err
	}
	c.zk = zkConn
}
